package com.demo.flink;

import com.plat.paas.kafka.Message;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.File;
import java.io.FileReader;
import java.io.LineNumberReader;
import java.net.URL;

/**
 * @author : pengjie
 * @PackageName : com.demo.flink
 * @Description : Kafka 数据源DEMO
 * @email : 627799251@qq.com
 * @Date : 2019/1/29 16:54
 */
public class SimpleStringGenerator implements SourceFunction<Message> {
    @Override
    public void run(SourceContext sourceContext) throws Exception {

        while (true){
            URL fileUrl = HotItems.class.getClassLoader().getResource("UserBehavior1.csv");
            File file = new File(fileUrl.toURI());
            FileReader in = new FileReader(file);
            LineNumberReader reader = new LineNumberReader(in);
            String line;
            while ((line = reader.readLine()) != null){
                System.out.println("lint = " + line);
                Message msg = new Message();
                msg.setId((int)(Math.random()*10000));
                msg.setTopic("flink-test2");
                msg.setMsg(line);
                sourceContext.collectWithTimestamp(msg, System.currentTimeMillis());
                Thread.sleep(1000);

            }
        }
    }

    @Override
    public void cancel() {

    }
}
